package com.lagou.flink.work.p3;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;


/**
 * 功能描述：
 *
 * @author : zhangyong
 * @since : 2021/5/18
 */


public class MyTimeWindow implements WindowFunction<Tuple2<String,Integer>, String,
        Tuple, TimeWindow> {
    @Override
    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        int sum = 0;
        for(Tuple2<String,Integer> tuple2 : iterable){
            sum +=tuple2.f1;
        }
        long start = timeWindow.getStart();
        long end = timeWindow.getEnd();
        collector.collect("key:" + tuple.getField(0) + " value: " + sum + "| window_start :"
                + format.format(start) + " window_end :" + format.format(end)
        );
    }
}
